Rxjava 基本概念
#rxjava
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
RxJava 的观察者模式
- RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
- 与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。Observable Subject:
定义:Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应于观察者模式中的观察者。Subscriber其实是一个实现了Observer的抽象类123456789Observable observable = Observable.create(new Observable.OnSubscribe<String>() {public void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();}});
- Create: 创建一个ObServable
- just(T …)
- from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送
Observer
定义: 观察者它决定事件触发的时候有怎么样的行为
RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;
当事件处理出现异常时框架自动触发onError()方法;
同时Observables支持链式调用,从而避免了回调嵌套的问题。1234567891011121314151617Observer<Object> observer = new Observer<Object>() {public void onCompleted() {}public void onError(Throwable e) {}public void onNext(Object s) {}};
Subscriber
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:
- 不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
- onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
- unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
- Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回1234567891011121314151617public interface Subscription {/*** Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription* was received.* <p>* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before* onCompleted is called).*/void unsubscribe();/*** Indicates whether this {@code Subscription} is currently unsubscribed.** @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise*/boolean isUnsubscribed();}
被观察者(Observable)订阅观察者(Observer)
|
|
完整的链式调用:
123456789101112131415161718192021222324252627 Observable.create(new Observable.OnSubscribe<Integer>() {public void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 5; i++) {subscriber.onNext(i);}subscriber.onCompleted();}}).subscribe(new Observer<Integer>() {public void onCompleted() {System.out.println("onCompleted");}public void onError(Throwable e) {System.out.println("onError");}public void onNext(Integer item) {System.out.println("Item is " + item);}});
Subscriber
|
|
异步线程:
- subscribeOn() 指定observable 所在的线程
- observeOn() 指点杆observer 所在的线程
Lift
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):123456789101112// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {return Observable.create(new OnSubscribe<R>() {public void call(Subscriber subscriber) {Subscriber newSubscriber = operator.call(subscriber);newSubscriber.onStart();onSubscribe.call(newSubscriber);}});}
这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——
subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
当含有 lift() 时:
- lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
- 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
- 当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
- 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。